跳到主要内容

Topic 管理

主要配置说明

--bootstrap-server

kafka-topics.sh --bootstrap-server [<broker:port>...]

指定 broker 服务地址。如果当前使用的是集群环境,只需要配置集群中任意一个可达的节点即可。不过为了防止意外情况,还是推荐指定多个。

示例:

bin/kafka-topics.sh \
--bootstrap-server 172.21.11.1:19092,172.21.11.1:29092

--topic <topic_name>

kafka-topics.sh --topic <topic_name>

指定具体 topic 名称,用于配合 --list--create--delete 等参数使用。

--partitions <num>

kafka-topics.sh \
--create \
--topic <topic_name> \
--partitions <num>

创建 topic 时指定分区数。topic 的一个分区同一时刻只能被一个消费者消费,也就是说当分区数越多,同一时刻被消费能力就越强。换句话说,分区数就是消费者并发数。

当生产者将消息投递到 topic 时,消息会被写入其中一个分区。至于写入哪个分区,则是根据消息 key 的 hash 值进行取模计算。也就是说当 key 设计得越合理,消息在分区中越呈现平均分布,消息越平均分布那并发能力就越强。

当然,分区数虽然能增加并发能力,但也不是越多越好。具体可以参考:分区数(partitions)不要乱加,提前规划

--replication-factor <num>

kafka-topics.sh \
--create \
--topic <topic_name> \
--replication-factor <num>

replication-factor 用于指定分区同步副本数。当 topic 根据消息 key 路由将消息写入某个分区,该分区就是 Leader 副本。该配置用于指定分区总副本(备份)数,除了 Leader 副本,其他都是 Follower 副本。生产者将消息投递到 topic 后,首先会将消息写入 Leader 副本,之后再同步到 Follower 副本。

这样能保证消息的高可用,即使 Leader 副本磁盘损坏,也能使用其他 Follower 副本代替。

生产环境中建议至少副本 3 个副本。

--config min.insync.replicas=<num>

kafka-topics.sh \
--create \
--topic <topic_name> \
--config min.insync.replicas=<num>

min.insync.replicas 指的是最少同步副本数。默认情况下,生产者将消息投递到 topic,写入 Leader 副本后就认为消息投递成功了,至于 Follower 副本有没有同步成功并不关心。该配置用于指定当消息写入分区后,最少同步指定个副本(含 Leader 副本),才认为消息投递 topic 成功,依此来达到真正的高可用。

需要注意的是,min.insync.replicas 设置的最少同步副本数不能大于 replication-factor。

--config retention.ms=<ms>

kafka-topics.sh \
--create \
--topic <topic_name> \
--config retention.ms=604800000

Kafka 的日志由许多 segment 文件组成,生产者将消息写入 topic 后,消息不会立马删除,而是依据提供的保留策略根据条件慢慢清理。

retention.ms 是最直接的保留时间。意思是:一条消息从写入开始,到达到 retention.ms 后,就允许 Kafka 将其删除。

注意是“允许”,不是“立刻删除”,只是被标记为可删除。真正的清理会在 log cleaner 或 log retention 线程运行时执行。

例如:

kafka-topics.sh --create \
--topic <topic_name> \
--config retention.ms=7

那么消息大致会在 7 天后被标记为可清除。

Note

retention.ms 的单位是 毫秒,这里设置 7天 只是便于理解。另外,-1 表示永久保存。

--config segment.bytes=<bytes>

kafka-topics.sh --create \
--topic <topic_name> \
--config segment.bytes=1073741824

控制 topic 日志大小,如果 topic 所有 segment 累计大小超过 retention.bytes,Kafka 会从最老的 segment 往前删。

例如:

kafka-topics.sh --create \
--topic <topic_name> \
--config retention.bytes=1GB

当前 topic 日志有 1.2GB,旧的 segment 会被删除直到重新低于 1GB。

retention.bytesretention.ms 谁先触发,就按谁来删除。

Note

retention.bytes 的单位是 byte,这里设置 1GB 只是便于理解。

--config cleanup.policy=<delete,compact>

kafka-topics.sh --create \
--topic <topic_name> \
--config cleanup.policy=delete,compact

控制日志如何“老去”。

  • delete:按时间/大小进行删除
  • compact:按 key 去重,只保留每个 key 的最后一条消息

简单地说就是:

cleanup.policy=delete  ## 按时间/空间删
cleanup.policy=compact ## 按 key 保留最新版本

另外,也可以设置成 delete,compact 的组合:

kafka-topics.sh --create \
--topic <topic_name> \
--config cleanup.policy=delete,compact

另外,如果不理解 compact 真正的含义,建议无脑用 delete 准没错。

--config delete.retention.ms=<ms>

kafka-topics.sh --create \
--topic <topic_name> \
--config cleanup.policy=compact
--config delete.retention.ms=604800000

该参数是对 cleanup.policy=compact 的扩充,在 cleanup.policy=delete 模式下不起作用。

compact 模式运行一个特殊线程叫 Log Cleaner,它的任务是扫描日志,把所有旧的 key 替换掉,只保留每个 key 的最新值。

比如生产者按照时间顺序依次向 topic 投递如下消息:

key=A, value=1
key=A, value=2
key=A, value=3

compact 模式下最终只会保留:

key=A, value=3

要删除一条 key,Kafka 写入的不是“删掉这条”,而是:

key=A, value=null → 这叫 tombstone

null 值 = 删除标记,Kafka 看到它就知道你想删除 key=A。

delete.retention.ms → 控制 tombstone 保留多久

Kafka 不会看到 tombstone 就立即删对应的旧数据。它会先把 tombstone 保留 一段时间,让 consumer 有机会看到 key 真的被删除过。

这段“墓碑停留时间”就是 delete.retention.ms,默认一般是 86400000ms(24 小时)。

划重点

重要的事情再说一遍,如果不能理解 --config cleanup.policy=compact 的含义,建议无脑用 --config cleanup.policy=delete

创建 topic

通用的创建 topic 命令参数如下,可根据需要自行指定:

了解过前面 topic 的主要配置之后,再去创建 topic 就得心应手了,通用的创建命令如下,可根据需要自行指定:

kafka-topics.sh \
--bootstrap-server [<broker:port>...] \
--create \
--topic <topic_name> \
--partitions <num> \ # 分区数, 根据吞吐量评估, 通常至少为 3
--replication-factor <num> \ # 每个分区同步副本数, 生产环境建议至少为 3 个
--config min.insync.replicas=<num> # 消息最小同步分区副本数, 保证高可用, 消息至少同步指定个副本, 才认为生产者消息投递 topic 成功, 建议至少 2 个
--config retention.ms=604800000 \ # 日志保留时长(7天), -1 表示永久保留
--config segment.bytes=1073741824 # 单日志分段最大大小(1GB)
--config cleanup.policy=delete # 日志处理策略, 根据需求选择 delete 或 compact

示例:

bin/kafka-topics.sh \
--bootstrap-server 172.21.11.1:19092,172.21.11.1:29092 \
--create \
--topic order.paid \
--partitions 3 \
--replication-factor 3 \
--config min.insync.replicas=3 \
--config cleanup.policy=delete \
--config retention.ms=2592000000

输出结果:

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic order.paid. <== topic 创建成功

前面的 WARNING 并不是错误,而是 KAFKA 友善的提醒你:在创建 topic 时不要混用 .-

这事源自 Kafka 的度量指标(metrics)名字会把 topic 名嵌进去,而早期某些系统会把 . 和 _ 都当成同一个分隔符。

也就是说创建 topic 时 order.paid 和 order_paid 可能会生成相同的 metric 名,造成“撞名”。这并不是什么错误,仅仅只是友善的提示你不要同时混用 .-,尽量保持统一的命名规范。

列出所有 topic

如果想要查看 kafka 中有多少 topic,可以使用 --list 查看:

$ bin/kafka-topics.sh \
--bootstrap-server 172.21.11.1:19092,172.21.11.1:29092 \
--list

order.paid

查看 topic 详细信息

如果想要查看 topic 的详细信息(如分区、副本数、最小同步策略)以及创建 topic 时指定的配置参数,可以使用 --describe 查看:

$ bin/kafka-topics.sh \
--bootstrap-server 172.21.11.1:19092,172.21.11.1:29092 \
--topic order.paid \
--describe

Topic: order.paid TopicId: tVFQoD0UR4CvWrIgLU0bDA PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=3,cleanup.policy=delete,segment.bytes=1073741824,retention.ms=2592000000,unclean.leader.election.enable=false
Topic: order.paid Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Elr: LastKnownElr:
Topic: order.paid Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Elr: LastKnownElr:
Topic: order.paid Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Elr: LastKnownElr:

删除 topic

如果某个 topic 创建错了,可以使用 --delete 删除:

$ bin/kafka-topics.sh \
--bootstrap-server 172.21.11.1:19092,172.21.11.1:29092 \
--delete \
--topic order.paid
NOTE

如果是在生产环境中,千万不要轻易的使用 --delete。因此,当你的业务需要使用一个新 topic 时,在创建之前一定要合理的创建分区,同时循命名规范并仔细斟酌并再去创建有业务含义的 topic 名称。一旦创建,就不要轻易的删除。

关于 broker 配置说明

禁用自动创建 topic

如果当前运行的是 cluster 模式,在集群启动之前,需要将所有的选举节点都设置为禁止自动创建 topic。也就是在 server.properties 都做如下配置:

auto.create.topics.enable=false

所谓的选举节点,就是 server.properties 的指定 roles 为 controller 的节点(下面两种配置都表示 broker 是选举节点):

process.roles=controller
process.roles=broker,controller

如果是 standalone 模式,只需要在 server.properties 中添加该配置即可。

broker 设置副本默认策略

在创建 topic 时,为了保证高可用,通常会设置多个消息副本(--replication-factor <num>),也就是说消息写入 Leader 之后会继续将消息同步到其他副本。默认情况下,消息写入 Leader 副本成功之后就认为生产者将消息投递 topic 成功了。

而为了保证真正的高可用(防止消息丢失),在创建 topic 时通常还会指定最少同步副本数(--config min.insync.replicas=<num>)。该配置解决的问题是,当消息写入 Leader 副本之后,还要继续同步其他副本,只有当消息至少成功写入 min.insync.replicas 个副本(含 Leader)才认为消息写入 topic 成功。

为了防止创建 topic 时遗漏,我们可以直接在 broker 的配置文件 server.properties 中设置默认的最小同步副本数:

min.insync.replicas=<num>

当然,如果是 cluster 模式,需要在所有的 broker 节点都设置。在 broker 级别设置,主要是增加一层保险而已。